小坚的技术博客

go语言grpc学习笔记

本文作者:陈进坚
个人博客:https://jian1098.github.io
CSDN博客:https://blog.csdn.net/c_jian
简书:https://www.jianshu.com/u/8ba9ac5706b6
联系方式:jian1098@qq.com

grpc教程


视频:https://www.bilibili.com/video/BV1GE411A7kp

代码:https://github.com/zhuge20100104/grpc-demo

打开go-module


1
2
set GO111MODULE=on    //windows
export GO111MODULE=on //linux

编辑器settings-GO-Go Modules勾选

安装protoc


Linux

创建目录

1
2
mkdir proto
cd proto

打开https://github.com/protocolbuffers/protobuf/releases下载最新版本的protoc-*-linux-x86_64.zip

1
wget https://github.com/protocolbuffers/protobuf/releases/download/v3.12.3/protoc-3.12.3-linux-x86_64.zip

解压

1
unzip protoc-3.12.3-linux-x86_64.zip

添加到path

1
vim /etc/profile

把你的bin路径加到最后保存

1
export PATH=$PATH:/root/protoc/bin

刷新配置表

1
source /etc/profile

查看版本

1
protoc --version

Windows

打开https://github.com/protocolbuffers/protobuf/releases下载最新版本的protoc-*-win64.zip

新建一个文件夹并加压,然后把bin目录添加到环境变量即可

查看版本

1
protoc --version

安装protoc-gen-go


执行命令

1
go get -u github.com/golang/protobuf/protoc-gen-go

然后会在$GOPATH/bin目录下发现protoc-gen-go.exe

安装IDE插件


此步骤可选

在goland插件库中安装Protobuf Support

grpc流程


创建proto文件

创建pbfiles/Prod.proto文件,复制下面的代码保存

1
2
3
4
5
6
7
8
9
10
syntax="proto3";
option go_package = ".;services";
package services;

message ProdRequest{
int32 prod_id=1;
}
message ProResponse{
int32 pro_stock=1;
}

生成.pb.go文件

创建services目录然后在pbfiles目录下执行命令

1
protoc --go_out=../services Prod.proto

会得到services/Prod.pb.go文件

pbfiles/Prod.proto文件新增服务代码

1
2
3
service ProService{
rpc GetProStock (ProdRequest) returns (ProResponse);
}

执行下面的命令

1
protoc --go_out=plugins=grpc:../services Prod.proto

services/Prod.pb.go文件会生成更多的代码

创建业务逻辑文件

在生成的.pb.go文件中找到GetProdStock的接口,然后复制,创建services/ProdService.go,然后实现GetProdStock方法的具体逻辑

1
2
3
4
5
6
7
8
9
10
11
12
package services

import (
"context"
)

type ProdService struct {
}

func (this *ProdService) GetProdStock(ctx context.Context, request *ProdRequest) (*ProdResponse, error) {
return &ProdResponse{ProStock:20}, nil
}

创建服务端

创建server/server.go,并写入以下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
"google.golang.org/grpc"
"grpc-test/services"
"log"
"net"
)

func main() {
rpcServer := grpc.NewServer()
services.RegisterProdServiceServer(rpcServer, new(services.ProdService))
lis, err := net.Listen("tcp", ":8081")
if err != nil {
log.Fatal(err)
}

//tcp服务
err = rpcServer.Serve(lis)
if err != nil {
log.Fatal(err)
}
}

创建客户端

创建client/client.go,并写入以下代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package main

import (
"context"
"fmt"
"grpc-test/services"
"log"
"google.golang.org/grpc"
)

func main() {
conn, err := grpc.Dial(":8081", grpc.WithInsecure()) //grpc.WithInsecure():不使用证书
if err != nil {
log.Fatalf("连接GRPC服务端失败 %v\n", err)
}

defer conn.Close()
prodClient := services.NewProdServiceClient(conn)
prodRes, err := prodClient.GetProdStock(context.Background(),
&services.ProdRequest{ProdId: 12})

if err != nil {
log.Fatalf("请求GRPC服务端失败 %v\n", err)
}
fmt.Println(prodRes.ProStock)
}

启动服务

在命令行执行go run server/server.go,然后在另一个终端执行go run client/client.go即可

同时提供rpc和http服务

时提供rpchttp服务的grpc框架

https://github.com/grpc-ecosystem/grpc-gateway

第三方字段验证库

除了自行对参数字段进行验证,也可以选用第三方库验证字段

1
github.com/envoyproxy/protoc-gen-validate/validate

流模式


服务端流

User.proto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
syntax = "proto3";

package services;

import "Model.proto";

message UserScoreRequest {
repeated UserInfo users = 1;
}

message UserScoreResponse {
repeated UserInfo users = 1;
}

service UserService {
rpc GetUserScore(UserScoreRequest) returns (UserScoreResponse) {}
rpc GetUserScoreByServerStream(UserScoreRequest) returns (stream UserScoreResponse) {} //定义rpc服务
}

服务端UserService.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package services
import context "context"

type UserService struct{}

func (*UserService) GetUserScore(ctx context.Context, req *UserScoreRequest) (*UserScoreResponse, error) {
var score int32 = 100
users := make([]*UserInfo, 0)
for _, user := range req.Users {
user.UserScore = score
score++
users = append(users, user)
}
return &UserScoreResponse{Users: users}, nil
}

func (*UserService) GetUserScoreByServerStream(req *UserScoreRequest,
stream UserService_GetUserScoreByServerStreamServer) error {
var score int32 = 100
users := make([]*UserInfo, 0)
for index, user := range req.Users { //分批发送给客户端
user.UserScore = score
score++
users = append(users, user)
if (index+1)%2 == 0 && index > 0 {
err := stream.Send(&UserScoreResponse{Users: users})
if err != nil {
return err
}
users = users[0:0]
}

}
// 发送最后一批
if len(users) > 0 {
err := stream.Send(&UserScoreResponse{Users: users})
if err != nil {
return err
}
}
return nil
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package main

import (
"context"
"fmt"
"io"
"log"

"github.com/zhuge20100104/grpc-demo/grpc-13/client/helper"

"github.com/zhuge20100104/grpc-demo/grpc-13/client/services"

"google.golang.org/grpc"
)

func main() {

conn, err := grpc.Dial(":8081", grpc.WithTransportCredentials(helper.GetClientCredentials()))
if err != nil {
log.Fatalf("连接GRPC服务端失败 %v\n", err)
}

defer conn.Close()

userClient := services.NewUserServiceClient(conn)

users := make([]*services.UserInfo, 0)
var i int32 = 0
for i = 0; i < 6; i++ {
user := &services.UserInfo{UserId: i + 1}
users = append(users, user)
}

stream, err := userClient.GetUserScoreByServerStream(context.Background(),
&services.UserScoreRequest{Users: users},
)

if err != nil {
log.Fatalf("请求GRPC服务端失败 %v\n", err)
}

for {
userRes, err := stream.Recv() //读取流数据
if err == io.EOF {
break
}
if err != nil {
fmt.Printf("读取服务端流失败 err: %v\n", err.Error())
}
fmt.Println(userRes.Users)
}
}

客户端流

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package services

import (
context "context"
"io"
)

type UserService struct{}

func (*UserService) GetUserScore(ctx context.Context, req *UserScoreRequest) (*UserScoreResponse, error) {
var score int32 = 100
users := make([]*UserInfo, 0)
for _, user := range req.Users {
user.UserScore = score
score++
users = append(users, user)
}

return &UserScoreResponse{Users: users}, nil
}

func (*UserService) GetUserScoreByServerStream(req *UserScoreRequest,
stream UserService_GetUserScoreByServerStreamServer) error {
var score int32 = 100
users := make([]*UserInfo, 0)
for index, user := range req.Users {
user.UserScore = score
score++
users = append(users, user)
if (index+1)%2 == 0 && index > 0 {
err := stream.Send(&UserScoreResponse{Users: users})
if err != nil {
return err
}
users = users[0:0]
}

}
// 发送最后一批
if len(users) > 0 {
err := stream.Send(&UserScoreResponse{Users: users})
if err != nil {
return err
}
}

return nil
}

func (*UserService) GetUserScoreByClientStream(stream UserService_GetUserScoreByClientStreamServer) error {
users := make([]*UserInfo, 0)
var score int32 = 100
for {
req, err := stream.Recv()
if err == io.EOF {
err = stream.SendAndClose(&UserScoreResponse{Users: users})
return err
}

if err != nil {
return err
}

for _, user := range req.Users {
user.UserScore = score
users = append(users, user)
score++
}
}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package main

import (
"context"
"fmt"
"log"

"github.com/zhuge20100104/grpc-demo/grpc-14/client/helper"

"github.com/zhuge20100104/grpc-demo/grpc-14/client/services"

"google.golang.org/grpc"
)

func main() {

conn, err := grpc.Dial(":8081", grpc.WithTransportCredentials(helper.GetClientCredentials()))
if err != nil {
log.Fatalf("连接GRPC服务端失败 %v\n", err)
}

defer conn.Close()

userClient := services.NewUserServiceClient(conn)

users := make([]*services.UserInfo, 0)
var i int32 = 0
for i = 0; i < 6; i++ {
user := &services.UserInfo{UserId: i + 1}
users = append(users, user)
}

stream, err := userClient.GetUserScoreByClientStream(context.Background())

if err != nil {
log.Fatalf("请求GRPC服务端失败 %v\n", err)
}

for i := 0; i < 3; i++ {
req := new(services.UserScoreRequest)
req.Users = make([]*services.UserInfo, 0)
var j int32
for j = 1; j <= 5; j++ {
req.Users = append(req.Users, &services.UserInfo{UserId: j})
}
stream.Send(req)
}

res, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("接收服务端请求失败 %v\n", err)
}

for _, user := range res.Users {
fmt.Println(user)
}

}

双向流

服务端UserService.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
package services

import (
context "context"
"io"
)

type UserService struct{}

func (*UserService) GetUserScore(ctx context.Context, req *UserScoreRequest) (*UserScoreResponse, error) {
var score int32 = 100
users := make([]*UserInfo, 0)
for _, user := range req.Users {
user.UserScore = score
score++
users = append(users, user)
}
return &UserScoreResponse{Users: users}, nil
}

func (*UserService) GetUserScoreByServerStream(req *UserScoreRequest,
stream UserService_GetUserScoreByServerStreamServer) error {
var score int32 = 100
users := make([]*UserInfo, 0)
for index, user := range req.Users {
user.UserScore = score
score++
users = append(users, user)
if (index+1)%2 == 0 && index > 0 {
err := stream.Send(&UserScoreResponse{Users: users})
if err != nil {
return err
}
users = users[0:0]
}
}
// 发送最后一批
if len(users) > 0 {
err := stream.Send(&UserScoreResponse{Users: users})
if err != nil {
return err
}
}
return nil
}

func (*UserService) GetUserScoreByClientStream(stream UserService_GetUserScoreByClientStreamServer) error {
users := make([]*UserInfo, 0)
var score int32 = 100
for {
req, err := stream.Recv()
if err == io.EOF {
err = stream.SendAndClose(&UserScoreResponse{Users: users})
return err
}

if err != nil {
return err
}

for _, user := range req.Users {
user.UserScore = score
users = append(users, user)
score++
}
}
}

func (*UserService) GetUserScoreByTWS(stream UserService_GetUserScoreByTWSServer) error {
users := make([]*UserInfo, 0)
var score int32 = 100
for {
req, err := stream.Recv()
if err == io.EOF {
return nil
}

if err != nil {
return err
}

for _, user := range req.Users {
user.UserScore = score
users = append(users, user)
score++
}

stream.Send(&UserScoreResponse{Users: users})
users = users[0:0]
}
}

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package main

import (
"context"
"fmt"
"io"
"log"
"github.com/zhuge20100104/grpc-demo/grpc-15/client/helper"
"github.com/zhuge20100104/grpc-demo/grpc-15/client/services"
"google.golang.org/grpc"
)

func main() {
conn, err := grpc.Dial(":8081", grpc.WithTransportCredentials(helper.GetClientCredentials()))
if err != nil {
log.Fatalf("连接GRPC服务端失败 %v\n", err)
}

defer conn.Close()

userClient := services.NewUserServiceClient(conn)

users := make([]*services.UserInfo, 0)
var i int32 = 0
for i = 0; i < 6; i++ {
user := &services.UserInfo{UserId: i + 1}
users = append(users, user)
}

stream, err := userClient.GetUserScoreByTWS(context.Background())

if err != nil {
log.Fatalf("请求GRPC服务端失败 %v\n", err)
}

for i := 0; i < 3; i++ {
req := new(services.UserScoreRequest)
req.Users = make([]*services.UserInfo, 0)
var j int32
for j = 1; j <= 5; j++ {
req.Users = append(req.Users, &services.UserInfo{UserId: j})
}
stream.Send(req)

res, err := stream.Recv()
if err == io.EOF {
break
}

if err != nil {
log.Fatalf("接收服务端请求失败 %v\n", err)
}
fmt.Println(res.Users)
}
}
-------------本文结束感谢您的阅读-------------
🐶 您的支持将鼓励我继续创作 🐶